// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include <zencore/iohash.h>
#include <zencore/logbase.h>
#include <zencore/thread.h>
#include <zenstore/caslog.h>
#include <zenstore/scrubcontext.h>

ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
ZEN_THIRD_PARTY_INCLUDES_END

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <filesystem>
#include <functional>
#include <optional>
#include <span>
#include <thread>

ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
ZEN_THIRD_PARTY_INCLUDES_END

namespace zen {

class CidStore;
class GcManager;
class HashKeySet;
class ScrubContext;
struct IoHash;
struct DiskSpace;

/** GC clock
 */
class GcClock
{
public:
	using Clock		= std::chrono::system_clock;
	using TimePoint = Clock::time_point;
	using Duration	= Clock::duration;
	using Tick		= int64_t;

	static Tick		 TickCount() { return Now().time_since_epoch().count(); }
	static TimePoint Now() { return Clock::now(); }
	static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; }
};

//////// Begin GC V2

struct GcSettings
{
	GcClock::TimePoint CacheExpireTime		  = GcClock::Now();
	GcClock::TimePoint ProjectStoreExpireTime = GcClock::Now();
	bool			   CollectSmallObjects	  = false;
	bool			   IsDeleteMode			  = false;
	bool			   SkipCidDelete		  = false;
	bool			   Verbose				  = false;
	bool			   SingleThread			  = false;
	uint32_t		   CompactBlockUsageThresholdPercent =
		90;	 // 0 = compact only empty eligible blocks, 100 = compact all non-full eligible blocks, 1-99 = compact eligible blocks with less
			 // usage than CompactBlockUsageThresholdPercent
	std::filesystem::path DiskReservePath;
};

struct GcCompactStoreStats
{
	std::uint64_t			  RemovedDisk = 0;
	std::chrono::milliseconds ElapsedMS	  = {};
};

struct GcStats
{
	std::uint64_t			  CheckedCount = 0;
	std::uint64_t			  FoundCount   = 0;
	std::uint64_t			  DeletedCount = 0;
	std::uint64_t			  FreedMemory  = 0;
	std::chrono::milliseconds ElapsedMS	   = {};
};

struct GcReferencerStats
{
	GcStats				RemoveExpiredDataStats;
	GcCompactStoreStats CompactStoreStats;

	std::chrono::milliseconds CreateReferenceCheckersMS = {};
	std::chrono::milliseconds PreCacheStateMS			= {};
	std::chrono::milliseconds UpdateLockedStateMS		= {};
	std::chrono::milliseconds ElapsedMS					= {};
};

struct GcReferenceStoreStats
{
	GcStats				RemoveUnreferencedDataStats;
	GcCompactStoreStats CompactStoreStats;

	std::chrono::milliseconds CreateReferencePrunersMS = {};
	std::chrono::milliseconds ElapsedMS				   = {};
};

struct GcResult
{
	std::vector<std::pair<std::string, GcReferencerStats>>	   ReferencerStats;
	std::vector<std::pair<std::string, GcReferenceStoreStats>> ReferenceStoreStats;

	GcReferencerStats	  ReferencerStatSum;
	GcReferenceStoreStats ReferenceStoreStatSum;
	GcCompactStoreStats	  CompactStoresStatSum;

	// Wall times, not sum of each
	std::chrono::milliseconds RemoveExpiredDataMS		= {};
	std::chrono::milliseconds CreateReferenceCheckersMS = {};
	std::chrono::milliseconds PreCacheStateMS			= {};
	std::chrono::milliseconds LockStateMS				= {};
	std::chrono::milliseconds UpdateLockedStateMS		= {};

	std::chrono::milliseconds CreateReferencePrunersMS = {};
	std::chrono::milliseconds RemoveUnreferencedDataMS = {};
	std::chrono::milliseconds CompactStoresMS		   = {};

	std::chrono::milliseconds WriteBlockMS = {};

	std::chrono::milliseconds ElapsedMS = {};

	bool WasCancelled = false;
};

class CbObjectWriter;

void WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanReadable, bool IncludeDetails);

struct GcCtx
{
	const GcSettings  Settings;
	std::atomic_bool& IsCancelledFlag;
};

typedef tsl::robin_set<IoHash> HashSet;

/**
 * @brief An interface to remove the stored data on disk after a GcReferencer::RemoveExpiredData and
 * GcReferencePruner::RemoveUnreferencedData
 *
 * CompactStore is called after state locking is complete so implementor must take care to only remove
 * data that has not been altered since the prune operation.
 *
 * Instance will be deleted after CompactStore has completed execution.
 *
 * The subclass constructor should be provided with information on what is intended to be removed.
 */
class GcStoreCompactor
{
public:
	virtual ~GcStoreCompactor() = default;

	// Remove data on disk based on results from GcReferencePruner::RemoveUnreferencedData
	virtual void CompactStore(GcCtx& Ctx, GcCompactStoreStats& Stats, const std::function<uint64_t()>& ClaimDiskReserveCallback) = 0;

	virtual std::string GetGcName(GcCtx& Ctx) = 0;
};

/**
 * @brief An interface to check if a set of Cids are referenced
 *
 * Instance will be deleted after RemoveUsedReferencesFromSet has been called 0-n times.
 *
 * During construction of the GcReferenceChecker the world is not stopped and this is a good
 * place to do caching to be able to execute LockState and RemoveUsedReferencesFromSet quickly.
 */
class GcReferenceChecker
{
public:
	// Destructor should unlock what was locked in LockState
	virtual ~GcReferenceChecker() = default;

	virtual std::string GetGcName(GcCtx& Ctx) = 0;

	// Read as much of the current state - nothing is locked for you here so you need to lock as appropriate
	virtual void PreCache(GcCtx& Ctx) = 0;

	// Update the state after all ReferenceCheckers has completed PreCache and all ReferenceLockers has
	// completed their LockState operation.
	// At this stage all data that UpdateLockedState needs to touch should be locked by the ReferenceLockers.
	// *IMPORTANT* Do *not* take any locks (shared or exclusive) in this code.
	// This is because we need to acquire the locks in an ordered manner and not end up in a deadlock due to other code
	// trying to get exclusive locks halfway through our execution.
	// Called once before any calls to RemoveUsedReferencesFromSet.
	// The implementation should be as fast as possible as UpdateLockedState is part of a stop the world (from changes)
	// until all instances of GcReferenceChecker UpdateLockedState are completed
	virtual void UpdateLockedState(GcCtx& Ctx) = 0;

	// Go through IoCids and see which ones are referenced. If it is the reference must be removed from IoCids
	// This function should use pre-cached information on what is referenced as we are in stop the world mode
	virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) = 0;
};

/**
 * @brief An interface to implement a lock for Stop The World (from writing new data)
 *
 * This interface is registered/unregistered to GcManager vua AddGcReferenceLocker() and RemoveGcReferenceLockerr()
 */
class GcReferenceLocker
{
public:
	virtual ~GcReferenceLocker() = default;

	// Take all the locks needed to execute UpdateLockedState for the all the GcReferenceChecker in your domain
	// Once all the GcReferenceChecker has executed UpdateLockedState and RemoveUsedReferencesFromSet for all
	// domains has completed, the locks will be disposed and writes are allowed once again
	virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) = 0;
};

/**
 * @brief Interface to handle GC of data that references Cid data
 *
 * This interface is registered/unregistered to GcManager vua AddGcReferencer() and RemoveGcReferencer()
 */
class GcReferencer
{
protected:
	virtual ~GcReferencer() = default;

public:
	virtual std::string GetGcName(GcCtx& Ctx) = 0;

	// Remove expired data based on either GcCtx::Settings CacheExpireTime or ProjectExpireTime
	virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) = 0;

	// Create 0-n GcReferenceChecker for this GcReferencer. Caller will manage lifetime of
	// returned instances
	virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) = 0;
};

/**
 * @brief Interface to prune - remove pointers to data but not the bulk data on disk - references from a GcReferenceStore
 */
class GcReferencePruner
{
public:
	virtual ~GcReferencePruner() = default;

	virtual std::string GetGcName(GcCtx& Ctx) = 0;

	typedef std::function<std::vector<IoHash>(std::span<IoHash> References)> GetUnusedReferencesFunc;

	// Check a set of references to see if they are in use.
	// Use the GetUnusedReferences input function to check if references are used and update any pointers
	// so any query for references determined to be unreferences will not be found.
	// If any references a found to be unused, return a GcStoreCompactor instance which will
	// clean up any stored bulk data mapping to the pruned references.
	// Caller will manage lifetime of returned instance
	// This function should execute as fast as possible, so try to prepare a list of references to check ahead of
	// call to this function and make sure the removal of unreferences items is as lightweight as possible.
	virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences) = 0;
};

/**
 * @brief A interface to prune referenced (Cid) data from a store
 */
class GcReferenceStore
{
protected:
	virtual ~GcReferenceStore() = default;

public:
	virtual std::string GetGcName(GcCtx& Ctx) = 0;

	// Create a GcReferencePruner which can check a set of references (decided by implementor) if they are no longer in use
	// Caller will manage lifetime of returned instance
	virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) = 0;
};

//////// End GC V2

/** Garbage Collection context object
 */
class GcContext
{
public:
	GcContext(const GcClock::TimePoint& CacheExpireTime, const GcClock::TimePoint& ProjectStoreExpireTime);
	~GcContext();

	void AddRetainedCids(std::span<const IoHash> Cid);
	void SetExpiredCacheKeys(const std::string& CacheKeyContext, std::vector<IoHash>&& ExpiredKeys);

	void IterateCids(std::function<void(const IoHash&)> Callback);

	void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&)> KeepFunc);
	void FilterCids(std::span<const IoHash> Cid, std::function<void(const IoHash&, bool)>&& FilterFunc);

	void			  AddDeletedCids(std::span<const IoHash> Cas);
	const HashKeySet& DeletedCids();

	std::span<const IoHash> ExpiredCacheKeys(const std::string& CacheKeyContext) const;

	bool SkipCid() const;
	void SetSkipCid(bool NewState);

	bool IsDeletionMode() const;
	void SetDeletionMode(bool NewState);

	bool CollectSmallObjects() const;
	void CollectSmallObjects(bool NewState);

	GcClock::TimePoint CacheExpireTime() const;
	GcClock::TimePoint ProjectStoreExpireTime() const;

	void	 DiskReservePath(const std::filesystem::path& Path);
	uint64_t ClaimGCReserve();

private:
	struct GcState;

	std::unique_ptr<GcState> m_State;
};

/** GC root contributor

	Higher level data structures provide roots for the garbage collector,
	which ultimately determine what is garbage and what data we need to
	retain.

 */
class GcContributor
{
public:
	virtual void GatherReferences(GcContext& GcCtx) = 0;

protected:
	virtual ~GcContributor() {}
};

struct GcStorageSize
{
	uint64_t DiskSize{};
	uint64_t MemorySize{};
};

/** GC storage provider
 */
class GcStorage
{
public:
	virtual void		  ScrubStorage(ScrubContext& ScrubCtx) = 0;
	virtual void		  CollectGarbage(GcContext& GcCtx)	   = 0;
	virtual GcStorageSize StorageSize() const				   = 0;

protected:
	virtual ~GcStorage() {}
};

/** Interface for querying if we are running low on disk space, used to deny put/writes to disk
 */
class DiskWriteBlocker
{
public:
	virtual bool AreDiskWritesAllowed() const = 0;
};

/** GC orchestrator
 */
class GcManager
{
public:
	GcManager();
	~GcManager();

	//////// Begin GC V2

	void AddGcReferencer(GcReferencer& Referencer);
	void RemoveGcReferencer(GcReferencer& Referencer);

	void AddGcReferenceLocker(GcReferenceLocker& ReferenceLocker);
	void RemoveGcReferenceLocker(GcReferenceLocker& ReferenceLocker);

	void AddGcReferenceStore(GcReferenceStore& ReferenceStore);
	void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore);

	GcResult CollectGarbage(const GcSettings& Settings);
	void	 SetCancelGC(bool CancelFlag);

	//////// End GC V2

	void AddGcContributor(GcContributor* Contributor);
	void RemoveGcContributor(GcContributor* Contributor);

	void AddGcStorage(GcStorage* Contributor);
	void RemoveGcStorage(GcStorage* Contributor);

	GcStorageSize CollectGarbage(GcContext& GcCtx);
	void		  ScrubStorage(ScrubContext& GcCtx);

	GcStorageSize TotalStorageSize() const;

	const DiskWriteBlocker* GetDiskWriteBlocker() { return m_DiskWriteBlocker; }
	void					SetDiskWriteBlocker(const DiskWriteBlocker* Monitor) { m_DiskWriteBlocker = Monitor; }

private:
	bool						CheckGCCancel() { return m_CancelGC.load(); }
	LoggerRef					Log() { return m_Log; }
	LoggerRef					m_Log;
	mutable RwLock				m_Lock;
	std::vector<GcContributor*> m_GcContribs;
	std::vector<GcStorage*>		m_GcStorage;
	CidStore*					m_CidStore		   = nullptr;
	const DiskWriteBlocker*		m_DiskWriteBlocker = nullptr;

	std::vector<GcReferencer*>		m_GcReferencers;
	std::vector<GcReferenceLocker*> m_GcReferencerLockers;
	std::vector<GcReferenceStore*>	m_GcReferenceStores;

	std::atomic_bool m_CancelGC{false};
};

enum class GcSchedulerStatus : uint32_t
{
	kIdle,
	kRunning,
	kStopped
};

enum class GcVersion : uint32_t
{
	kV1,
	kV2
};

struct GcSchedulerConfig
{
	std::filesystem::path RootDirectory;
	std::chrono::seconds  MonitorInterval{30};
	std::chrono::seconds  Interval{};
	std::chrono::seconds  MaxCacheDuration{86400};
	std::chrono::seconds  MaxProjectStoreDuration{604800};
	bool				  CollectSmallObjects				= true;
	bool				  Enabled							= true;
	uint64_t			  DiskReserveSize					= 1ul << 28;
	uint64_t			  DiskSizeSoftLimit					= 0;
	uint64_t			  MinimumFreeDiskSpaceToAllowWrites = 1ul << 28;
	std::chrono::seconds  LightweightInterval{};
	GcVersion			  UseGCVersion						= GcVersion::kV1;
	uint32_t			  CompactBlockUsageThresholdPercent = 90;
	bool				  Verbose							= false;
	bool				  SingleThreaded					= false;
};

struct GcSchedulerState
{
	GcSchedulerStatus Status;

	GcSchedulerConfig Config;

	bool				 AreDiskWritesBlocked = false;
	bool				 HasDiskReserve		  = false;
	uint64_t			 DiskSize			  = 0;
	uint64_t			 DiskUsed			  = 0;
	uint64_t			 DiskFree			  = 0;
	GcClock::TimePoint	 LastFullGcTime{};
	GcClock::TimePoint	 LastLightweightGcTime{};
	std::chrono::seconds RemainingTimeUntilLightweightGc;
	std::chrono::seconds RemainingTimeUntilFullGc;
	uint64_t			 RemainingSpaceUntilFullGC = 0;

	std::chrono::milliseconds LastFullGcDuration{};
	GcStorageSize			  LastFullGCDiff;
	std::chrono::milliseconds LastLightweightGcDuration{};
	GcStorageSize			  LastLightweightGCDiff;

	std::optional<GcResult> LastLightweightGCV2Result;
	std::optional<GcResult> LastFullGCV2Result;
};

class DiskUsageWindow
{
public:
	struct DiskUsageEntry
	{
		GcClock::Tick SampleTime;
		uint64_t	  DiskUsage;
	};

	std::vector<DiskUsageEntry> m_LogWindow;
	inline void					Append(const DiskUsageEntry& Entry) { m_LogWindow.push_back(Entry); }
	inline void					Append(DiskUsageEntry&& Entry) { m_LogWindow.emplace_back(std::move(Entry)); }
	void						KeepRange(GcClock::Tick StartTick, GcClock::Tick EndTick);
	std::vector<uint64_t>		GetDiskDeltas(GcClock::Tick StartTick,
											  GcClock::Tick EndTick,
											  GcClock::Tick DeltaWidth,
											  uint64_t&		OutMaxDelta) const;
	GcClock::Tick				FindTimepointThatRemoves(uint64_t Amount, GcClock::Tick EndTick) const;
};

/**
 * GC scheduler
 */
class GcScheduler : private DiskWriteBlocker
{
public:
	GcScheduler(GcManager& GcManager);
	~GcScheduler();

	void			  Initialize(const GcSchedulerConfig& Config);
	void			  Shutdown();
	GcSchedulerStatus Status() const { return static_cast<GcSchedulerStatus>(m_Status.load()); }
	GcSchedulerState  GetState() const;

	struct TriggerGcParams
	{
		bool					 CollectSmallObjects	 = false;
		std::chrono::seconds	 MaxCacheDuration		 = std::chrono::seconds::max();
		std::chrono::seconds	 MaxProjectStoreDuration = std::chrono::seconds::max();
		uint64_t				 DiskSizeSoftLimit		 = 0;
		bool					 SkipCid				 = false;
		bool					 SkipDelete				 = false;
		std::optional<GcVersion> ForceGCVersion;
		std::optional<uint32_t>	 CompactBlockUsageThresholdPercent;
		std::optional<bool>		 Verbose;
		std::optional<bool>		 SingleThreaded;
	};

	bool TriggerGc(const TriggerGcParams& Params);

	struct TriggerScrubParams
	{
		bool				 SkipGc		  = false;
		std::chrono::seconds MaxTimeslice = std::chrono::seconds::max();
		bool				 SkipDelete	  = false;
		bool				 SkipCas	  = false;
	};

	bool TriggerScrub(const TriggerScrubParams& Params);

	bool CancelGC();

private:
	void		 SchedulerThread();
	void		 CollectGarbage(const GcClock::TimePoint& CacheExpireTime,
								const GcClock::TimePoint& ProjectStoreExpireTime,
								bool					  Delete,
								bool					  CollectSmallObjects,
								bool					  SkipCid,
								GcVersion				  UseGCVersion,
								uint32_t				  CompactBlockUsageThresholdPercent,
								bool					  Verbose,
								bool					  SingleThreaded);
	void		 ScrubStorage(bool DoDelete, bool SkipCid, std::chrono::seconds TimeSlice);
	LoggerRef	 Log() { return m_Log; }
	virtual bool AreDiskWritesAllowed() const override { return !m_AreDiskWritesBlocked.load(); }
	DiskSpace	 CheckDiskSpace();
	void		 AppendGCLog(GcClock::TimePoint GcStartTime, const GcSettings& Settings, const GcResult& Result);

	LoggerRef		   m_Log;
	GcManager&		   m_GcManager;
	GcSchedulerConfig  m_Config;
	GcClock::TimePoint m_LastGcTime{};
	GcClock::TimePoint m_LastLightweightGcTime{};
	GcClock::TimePoint m_LastGcExpireTime{};

	std::chrono::milliseconds m_LastFullGcDuration{};
	GcStorageSize			  m_LastFullGCDiff;
	std::chrono::milliseconds m_LastLightweightGcDuration{};
	GcStorageSize			  m_LastLightweightGCDiff;

	std::optional<GcResult> m_LastLightweightGCV2Result;
	std::optional<GcResult> m_LastFullGCV2Result;

	std::atomic_uint32_t			  m_Status{};
	std::thread						  m_GcThread;
	mutable std::mutex				  m_GcMutex;
	std::condition_variable			  m_GcSignal;
	std::optional<TriggerGcParams>	  m_TriggerGcParams;
	std::optional<TriggerScrubParams> m_TriggerScrubParams;
	std::atomic_bool				  m_AreDiskWritesBlocked = false;

	TCasLogFile<DiskUsageWindow::DiskUsageEntry> m_DiskUsageLog;
	DiskUsageWindow								 m_DiskUsageWindow;

	RwLock m_GcLogLock;
};

void gc_forcelink();

}  // namespace zen

template<>
struct fmt::formatter<zen::GcClock::TimePoint> : formatter<string_view>
{
	template<typename FormatContext>
	auto format(const zen::GcClock::TimePoint& TimePoint, FormatContext& ctx)
	{
		std::time_t Time = std::chrono::system_clock::to_time_t(TimePoint);
		char		TimeString[std::size("yyyy-mm-ddThh:mm:ss")];
		std::strftime(std::data(TimeString), std::size(TimeString), "%FT%T", std::localtime(&Time));
		return fmt::format_to(ctx.out(), "{}", TimeString);
	}
};
